package org.codehaus.activemq.store.bdb;

import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.SecondaryConfig;
import com.sleepycat.je.SecondaryCursor;
import com.sleepycat.je.SecondaryDatabase;
import com.sleepycat.je.Transaction;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.AlreadyClosedException;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class BDbMessageStore
  implements MessageStore
{
  private static final Log log = LogFactory.getLog(BDbMessageStore.class);
  private Database database;
  private WireFormat wireFormat;
  private SecondaryDatabase secondaryDatabase;
  private SecondaryConfig secondaryConfig;
  private SequenceNumberCreator sequenceNumberCreator;
  private MessageContainer container;
  private CursorConfig cursorConfig;

  public BDbMessageStore(Database database, SecondaryDatabase secondaryDatabase, SecondaryConfig secondaryConfig, SequenceNumberCreator sequenceNumberCreator, WireFormat wireFormat)
  {
    this.database = database;
    this.secondaryDatabase = secondaryDatabase;
    this.secondaryConfig = secondaryConfig;
    this.sequenceNumberCreator = sequenceNumberCreator;
    this.wireFormat = wireFormat;
  }

  public void setMessageContainer(MessageContainer container) {
    this.container = container;
  }

  public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
    checkClosed();
    String messageID = message.getJMSMessageID();
    try {
      Transaction transaction = BDbHelper.getTransaction();
      DatabaseEntry key = createKey(messageID);
      DatabaseEntry value = new DatabaseEntry(asBytes(message));
      this.database.put(transaction, key, value);

      MessageIdentity answer = message.getJMSMessageIdentity();
      answer.setSequenceNumber(this.sequenceNumberCreator.getLastKey());
      return answer;
    }
    catch (DatabaseException e) {
      throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
    } catch (IOException e) {
    	 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
    	 
    }
    }

  public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException
  {
    checkClosed();
    ActiveMQMessage answer = null;
    String messageID = identity.getMessageID();
    try {
      DatabaseEntry key = createKey(messageID);
      DatabaseEntry value = new DatabaseEntry();
      if (this.database.get(null, key, value, null) == OperationStatus.SUCCESS) {
        answer = extractMessage(value);
      }
      return answer;
    }
    catch (DatabaseException e) {
      throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
    } catch (IOException e) {
        throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);

    }
  }

  public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException
  {
    checkClosed();
    String messageID = identity.getMessageID();
    try {
      Transaction transaction = BDbHelper.getTransaction();

      DatabaseEntry sequenceNumber = getSequenceNumberKey(identity);

      this.sequenceNumberCreator.setDeleteKey(sequenceNumber);

      OperationStatus status = this.secondaryDatabase.delete(transaction, sequenceNumber);
      if (status != OperationStatus.SUCCESS)
        log.error("Could not delete sequenece number for: " + identity + " status: " + status);
    }
    catch (DatabaseException e)
    {
      throw JMSExceptionHelper.newJMSException("Failed to delete message: " + messageID + " from container: " + e, e);
    }
  }

  public void recover(QueueMessageContainer container) throws JMSException {
    checkClosed();
    SecondaryCursor cursor = null;
    try {
      cursor = this.secondaryDatabase.openSecondaryCursor(BDbHelper.getTransaction(), this.cursorConfig);
      DatabaseEntry sequenceNumberEntry = new DatabaseEntry();
      DatabaseEntry keyEntry = new DatabaseEntry();
      DatabaseEntry valueEntry = new DatabaseEntry();
      OperationStatus status = cursor.getFirst(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS) {
        String messageID = extractString(keyEntry);
        container.recoverMessageToBeDelivered(new MessageIdentity(messageID, sequenceNumberEntry));
        status = cursor.getNext(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
      }
      if (status != OperationStatus.NOTFOUND)
        log.warn("Unexpected status code while recovering: " + status);
    }
    catch (DatabaseException e)
    {
      throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
    }
    finally {
      if (cursor != null)
        try {
          cursor.close();
        }
        catch (DatabaseException e) {
          log.warn("Caught exception closing cursor: " + e, e);
        }
    }
  }

  public void start()
    throws JMSException
  {
  }

  public void stop() throws JMSException
  {
    JMSException firstException = BDbPersistenceAdapter.closeDatabase(this.secondaryDatabase, null);
    firstException = BDbPersistenceAdapter.closeDatabase(this.database, firstException);

    this.secondaryDatabase = null;
    this.database = null;

    if (firstException != null)
      throw firstException;
  }

  protected SecondaryDatabase getSecondaryDatabase()
  {
    return this.secondaryDatabase;
  }

  protected Database getDatabase() {
    return this.database;
  }

  public CursorConfig getCursorConfig() {
    return this.cursorConfig;
  }

  public MessageContainer getContainer() {
    return this.container;
  }

  protected void checkClosed() throws AlreadyClosedException
  {
    if (this.database == null)
      throw new AlreadyClosedException("Berkeley DB MessageStore");
  }

  protected DatabaseEntry getSequenceNumberKey(MessageIdentity identity)
    throws DatabaseException
  {
    DatabaseEntry sequenceNumber = (DatabaseEntry)identity.getSequenceNumber();
    if (sequenceNumber == null) {
      sequenceNumber = findSequenceNumber(identity.getMessageID());
    }
    return sequenceNumber;
  }

  protected DatabaseEntry createKey(String messageID) {
    DatabaseEntry key = new DatabaseEntry(asBytes(messageID));
    return key;
  }

  protected DatabaseEntry findSequenceNumber(String messageID)
    throws DatabaseException
  {
    log.warn("Having to table scan to find the sequence number for messageID: " + messageID);

    SecondaryCursor cursor = null;
    try {
      cursor = this.secondaryDatabase.openSecondaryCursor(BDbHelper.getTransaction(), this.cursorConfig);
      DatabaseEntry sequenceNumberEntry = new DatabaseEntry();
      DatabaseEntry keyEntry = new DatabaseEntry();
      DatabaseEntry valueEntry = new DatabaseEntry();
      OperationStatus status = cursor.getFirst(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS) {
        String value = extractString(keyEntry);
        DatabaseEntry localDatabaseEntry1;
        if (messageID.equals(value)) {
          localDatabaseEntry1 = sequenceNumberEntry; 
          //jsr 37;
        }
        status = cursor.getNext(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
      }
    }
    finally {
      if (cursor != null) {
        try {
          cursor.close();
        }
        catch (DatabaseException e) {
          log.warn("Caught exception closing cursor: " + e, e);
        }
      }
    }
    return null;
  }

  protected String extractString(DatabaseEntry entry) {
    return new String(entry.getData(), entry.getOffset(), entry.getSize());
  }

  protected ActiveMQMessage extractMessage(DatabaseEntry value) throws IOException
  {
    synchronized (this.wireFormat) {
      return (ActiveMQMessage)this.wireFormat.fromBytes(value.getData(), value.getOffset(), value.getSize());
    }
  }

  protected byte[] asBytes(ActiveMQMessage message) throws IOException, JMSException
  {
    synchronized (this.wireFormat) {
      return this.wireFormat.toBytes(message);
    }
  }

  protected byte[] asBytes(String messageID) {
    return messageID.getBytes();
  }
}